# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
# mypy: no-warn-return-any, allow-any-generics

from __future__ import annotations

from io import StringIO
import re
from typing import Any
from typing import cast
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import TYPE_CHECKING
from typing import Union

from mako.pygen import PythonPrinter
from sqlalchemy import schema as sa_schema
from sqlalchemy import sql
from sqlalchemy import types as sqltypes
from sqlalchemy.sql.elements import conv
from sqlalchemy.sql.elements import quoted_name

from .. import util
from ..operations import ops
from ..util import sqla_compat

if TYPE_CHECKING:
    from typing import Literal

    from sqlalchemy.sql.base import DialectKWArgs
    from sqlalchemy.sql.elements import ColumnElement
    from sqlalchemy.sql.elements import TextClause
    from sqlalchemy.sql.schema import CheckConstraint
    from sqlalchemy.sql.schema import Column
    from sqlalchemy.sql.schema import Constraint
    from sqlalchemy.sql.schema import FetchedValue
    from sqlalchemy.sql.schema import ForeignKey
    from sqlalchemy.sql.schema import ForeignKeyConstraint
    from sqlalchemy.sql.schema import Index
    from sqlalchemy.sql.schema import MetaData
    from sqlalchemy.sql.schema import PrimaryKeyConstraint
    from sqlalchemy.sql.schema import UniqueConstraint
    from sqlalchemy.sql.sqltypes import ARRAY
    from sqlalchemy.sql.type_api import TypeEngine

    from alembic.autogenerate.api import AutogenContext
    from alembic.config import Config
    from alembic.operations.ops import MigrationScript
    from alembic.operations.ops import ModifyTableOps
    from alembic.util.sqla_compat import Computed
    from alembic.util.sqla_compat import Identity


MAX_PYTHON_ARGS = 255


def _render_gen_name(
    autogen_context: AutogenContext,
    name: sqla_compat._ConstraintName,
) -> Optional[Union[quoted_name, str, _f_name]]:
    if isinstance(name, conv):
        return _f_name(_alembic_autogenerate_prefix(autogen_context), name)
    else:
        return sqla_compat.constraint_name_or_none(name)


def _indent(text: str) -> str:
    text = re.compile(r"^", re.M).sub("    ", text).strip()
    text = re.compile(r" +$", re.M).sub("", text)
    return text


def _render_python_into_templatevars(
    autogen_context: AutogenContext,
    migration_script: MigrationScript,
    template_args: Dict[str, Union[str, Config]],
) -> None:
    imports = autogen_context.imports

    for upgrade_ops, downgrade_ops in zip(
        migration_script.upgrade_ops_list, migration_script.downgrade_ops_list
    ):
        template_args[upgrade_ops.upgrade_token] = _indent(
            _render_cmd_body(upgrade_ops, autogen_context)
        )
        template_args[downgrade_ops.downgrade_token] = _indent(
            _render_cmd_body(downgrade_ops, autogen_context)
        )
    template_args["imports"] = "\n".join(sorted(imports))


default_renderers = renderers = util.Dispatcher()


def _render_cmd_body(
    op_container: ops.OpContainer,
    autogen_context: AutogenContext,
) -> str:
    buf = StringIO()
    printer = PythonPrinter(buf)

    printer.writeline(
        "# ### commands auto generated by Alembic - please adjust! ###"
    )

    has_lines = False
    for op in op_container.ops:
        lines = render_op(autogen_context, op)
        has_lines = has_lines or bool(lines)

        for line in lines:
            printer.writeline(line)

    if not has_lines:
        printer.writeline("pass")

    printer.writeline("# ### end Alembic commands ###")

    return buf.getvalue()


def render_op(
    autogen_context: AutogenContext, op: ops.MigrateOperation
) -> List[str]:
    renderer = renderers.dispatch(op)
    lines = util.to_list(renderer(autogen_context, op))
    return lines


def render_op_text(
    autogen_context: AutogenContext, op: ops.MigrateOperation
) -> str:
    return "\n".join(render_op(autogen_context, op))


@renderers.dispatch_for(ops.ModifyTableOps)
def _render_modify_table(
    autogen_context: AutogenContext, op: ModifyTableOps
) -> List[str]:
    opts = autogen_context.opts
    render_as_batch = opts.get("render_as_batch", False)

    if op.ops:
        lines = []
        if render_as_batch:
            with autogen_context._within_batch():
                lines.append(
                    "with op.batch_alter_table(%r, schema=%r) as batch_op:"
                    % (op.table_name, op.schema)
                )
                for t_op in op.ops:
                    t_lines = render_op(autogen_context, t_op)
                    lines.extend(t_lines)
                lines.append("")
        else:
            for t_op in op.ops:
                t_lines = render_op(autogen_context, t_op)
                lines.extend(t_lines)

        return lines
    else:
        return []


@renderers.dispatch_for(ops.CreateTableCommentOp)
def _render_create_table_comment(
    autogen_context: AutogenContext, op: ops.CreateTableCommentOp
) -> str:
    if autogen_context._has_batch:
        templ = (
            "{prefix}create_table_comment(\n"
            "{indent}{comment},\n"
            "{indent}existing_comment={existing}\n"
            ")"
        )
    else:
        templ = (
            "{prefix}create_table_comment(\n"
            "{indent}'{tname}',\n"
            "{indent}{comment},\n"
            "{indent}existing_comment={existing},\n"
            "{indent}schema={schema}\n"
            ")"
        )
    return templ.format(
        prefix=_alembic_autogenerate_prefix(autogen_context),
        tname=op.table_name,
        comment="%r" % op.comment if op.comment is not None else None,
        existing=(
            "%r" % op.existing_comment
            if op.existing_comment is not None
            else None
        ),
        schema="'%s'" % op.schema if op.schema is not None else None,
        indent="    ",
    )


@renderers.dispatch_for(ops.DropTableCommentOp)
def _render_drop_table_comment(
    autogen_context: AutogenContext, op: ops.DropTableCommentOp
) -> str:
    if autogen_context._has_batch:
        templ = (
            "{prefix}drop_table_comment(\n"
            "{indent}existing_comment={existing}\n"
            ")"
        )
    else:
        templ = (
            "{prefix}drop_table_comment(\n"
            "{indent}'{tname}',\n"
            "{indent}existing_comment={existing},\n"
            "{indent}schema={schema}\n"
            ")"
        )
    return templ.format(
        prefix=_alembic_autogenerate_prefix(autogen_context),
        tname=op.table_name,
        existing=(
            "%r" % op.existing_comment
            if op.existing_comment is not None
            else None
        ),
        schema="'%s'" % op.schema if op.schema is not None else None,
        indent="    ",
    )


@renderers.dispatch_for(ops.CreateTableOp)
def _add_table(autogen_context: AutogenContext, op: ops.CreateTableOp) -> str:
    table = op.to_table()

    args = [
        col
        for col in [
            _render_column(col, autogen_context) for col in table.columns
        ]
        if col
    ] + sorted(
        [
            rcons
            for rcons in [
                _render_constraint(
                    cons, autogen_context, op._namespace_metadata
                )
                for cons in table.constraints
            ]
            if rcons is not None
        ]
    )

    if len(args) > MAX_PYTHON_ARGS:
        args_str = "*[" + ",\n".join(args) + "]"
    else:
        args_str = ",\n".join(args)

    text = "%(prefix)screate_table(%(tablename)r,\n%(args)s" % {
        "tablename": _ident(op.table_name),
        "prefix": _alembic_autogenerate_prefix(autogen_context),
        "args": args_str,
    }
    if op.schema:
        text += ",\nschema=%r" % _ident(op.schema)

    comment = table.comment
    if comment:
        text += ",\ncomment=%r" % _ident(comment)

    info = table.info
    if info:
        text += f",\ninfo={info!r}"

    for k in sorted(op.kw):
        text += ",\n%s=%r" % (k.replace(" ", "_"), op.kw[k])

    if table._prefixes:
        prefixes = ", ".join("'%s'" % p for p in table._prefixes)
        text += ",\nprefixes=[%s]" % prefixes

    if op.if_not_exists is not None:
        text += ",\nif_not_exists=%r" % bool(op.if_not_exists)

    text += "\n)"
    return text


@renderers.dispatch_for(ops.DropTableOp)
def _drop_table(autogen_context: AutogenContext, op: ops.DropTableOp) -> str:
    text = "%(prefix)sdrop_table(%(tname)r" % {
        "prefix": _alembic_autogenerate_prefix(autogen_context),
        "tname": _ident(op.table_name),
    }
    if op.schema:
        text += ", schema=%r" % _ident(op.schema)

    if op.if_exists is not None:
        text += ", if_exists=%r" % bool(op.if_exists)

    text += ")"
    return text


def _render_dialect_kwargs_items(
    autogen_context: AutogenContext, item: DialectKWArgs
) -> list[str]:
    return [
        f"{key}={_render_potential_expr(val, autogen_context)}"
        for key, val in item.dialect_kwargs.items()
    ]


@renderers.dispatch_for(ops.CreateIndexOp)
def _add_index(autogen_context: AutogenContext, op: ops.CreateIndexOp) -> str:
    index = op.to_index()

    has_batch = autogen_context._has_batch

    if has_batch:
        tmpl = (
            "%(prefix)screate_index(%(name)r, [%(columns)s], "
            "unique=%(unique)r%(kwargs)s)"
        )
    else:
        tmpl = (
            "%(prefix)screate_index(%(name)r, %(table)r, [%(columns)s], "
            "unique=%(unique)r%(schema)s%(kwargs)s)"
        )

    assert index.table is not None

    opts = _render_dialect_kwargs_items(autogen_context, index)
    if op.if_not_exists is not None:
        opts.append("if_not_exists=%r" % bool(op.if_not_exists))
    text = tmpl % {
        "prefix": _alembic_autogenerate_prefix(autogen_context),
        "name": _render_gen_name(autogen_context, index.name),
        "table": _ident(index.table.name),
        "columns": ", ".join(
            _get_index_rendered_expressions(index, autogen_context)
        ),
        "unique": index.unique or False,
        "schema": (
            (", schema=%r" % _ident(index.table.schema))
            if index.table.schema
            else ""
        ),
        "kwargs": ", " + ", ".join(opts) if opts else "",
    }
    return text


@renderers.dispatch_for(ops.DropIndexOp)
def _drop_index(autogen_context: AutogenContext, op: ops.DropIndexOp) -> str:
    index = op.to_index()

    has_batch = autogen_context._has_batch

    if has_batch:
        tmpl = "%(prefix)sdrop_index(%(name)r%(kwargs)s)"
    else:
        tmpl = (
            "%(prefix)sdrop_index(%(name)r, "
            "table_name=%(table_name)r%(schema)s%(kwargs)s)"
        )
    opts = _render_dialect_kwargs_items(autogen_context, index)
    if op.if_exists is not None:
        opts.append("if_exists=%r" % bool(op.if_exists))
    text = tmpl % {
        "prefix": _alembic_autogenerate_prefix(autogen_context),
        "name": _render_gen_name(autogen_context, op.index_name),
        "table_name": _ident(op.table_name),
        "schema": ((", schema=%r" % _ident(op.schema)) if op.schema else ""),
        "kwargs": ", " + ", ".join(opts) if opts else "",
    }
    return text


@renderers.dispatch_for(ops.CreateUniqueConstraintOp)
def _add_unique_constraint(
    autogen_context: AutogenContext, op: ops.CreateUniqueConstraintOp
) -> List[str]:
    return [_uq_constraint(op.to_constraint(), autogen_context, True)]


@renderers.dispatch_for(ops.CreateForeignKeyOp)
def _add_fk_constraint(
    autogen_context: AutogenContext, op: ops.CreateForeignKeyOp
) -> str:
    args = [repr(_render_gen_name(autogen_context, op.constraint_name))]
    if not autogen_context._has_batch:
        args.append(repr(_ident(op.source_table)))

    args.extend(
        [
            repr(_ident(op.referent_table)),
            repr([_ident(col) for col in op.local_cols]),
            repr([_ident(col) for col in op.remote_cols]),
        ]
    )
    kwargs = [
        "referent_schema",
        "onupdate",
        "ondelete",
        "initially",
        "deferrable",
        "use_alter",
        "match",
    ]
    if not autogen_context._has_batch:
        kwargs.insert(0, "source_schema")

    for k in kwargs:
        if k in op.kw:
            value = op.kw[k]
            if value is not None:
                args.append("%s=%r" % (k, value))

    return "%(prefix)screate_foreign_key(%(args)s)" % {
        "prefix": _alembic_autogenerate_prefix(autogen_context),
        "args": ", ".join(args),
    }


@renderers.dispatch_for(ops.CreatePrimaryKeyOp)
def _add_pk_constraint(constraint, autogen_context):
    raise NotImplementedError()


@renderers.dispatch_for(ops.CreateCheckConstraintOp)
def _add_check_constraint(constraint, autogen_context):
    raise NotImplementedError()


@renderers.dispatch_for(ops.DropConstraintOp)
def _drop_constraint(
    autogen_context: AutogenContext, op: ops.DropConstraintOp
) -> str:
    prefix = _alembic_autogenerate_prefix(autogen_context)
    name = _render_gen_name(autogen_context, op.constraint_name)
    schema = _ident(op.schema) if op.schema else None
    type_ = _ident(op.constraint_type) if op.constraint_type else None

    params_strs = []
    params_strs.append(repr(name))
    if not autogen_context._has_batch:
        params_strs.append(repr(_ident(op.table_name)))
        if schema is not None:
            params_strs.append(f"schema={schema!r}")
    if type_ is not None:
        params_strs.append(f"type_={type_!r}")

    return f"{prefix}drop_constraint({', '.join(params_strs)})"


@renderers.dispatch_for(ops.AddColumnOp)
def _add_column(autogen_context: AutogenContext, op: ops.AddColumnOp) -> str:
    schema, tname, column = op.schema, op.table_name, op.column
    if autogen_context._has_batch:
        template = "%(prefix)sadd_column(%(column)s)"
    else:
        template = "%(prefix)sadd_column(%(tname)r, %(column)s"
        if schema:
            template += ", schema=%(schema)r"
        template += ")"
    text = template % {
        "prefix": _alembic_autogenerate_prefix(autogen_context),
        "tname": tname,
        "column": _render_column(column, autogen_context),
        "schema": schema,
    }
    return text


@renderers.dispatch_for(ops.DropColumnOp)
def _drop_column(autogen_context: AutogenContext, op: ops.DropColumnOp) -> str:
    schema, tname, column_name = op.schema, op.table_name, op.column_name

    if autogen_context._has_batch:
        template = "%(prefix)sdrop_column(%(cname)r)"
    else:
        template = "%(prefix)sdrop_column(%(tname)r, %(cname)r"
        if schema:
            template += ", schema=%(schema)r"
        template += ")"

    text = template % {
        "prefix": _alembic_autogenerate_prefix(autogen_context),
        "tname": _ident(tname),
        "cname": _ident(column_name),
        "schema": _ident(schema),
    }
    return text


@renderers.dispatch_for(ops.AlterColumnOp)
def _alter_column(
    autogen_context: AutogenContext, op: ops.AlterColumnOp
) -> str:
    tname = op.table_name
    cname = op.column_name
    server_default = op.modify_server_default
    type_ = op.modify_type
    nullable = op.modify_nullable
    comment = op.modify_comment
    autoincrement = op.kw.get("autoincrement", None)
    existing_type = op.existing_type
    existing_nullable = op.existing_nullable
    existing_comment = op.existing_comment
    existing_server_default = op.existing_server_default
    schema = op.schema

    indent = " " * 11

    if autogen_context._has_batch:
        template = "%(prefix)salter_column(%(cname)r"
    else:
        template = "%(prefix)salter_column(%(tname)r, %(cname)r"

    text = template % {
        "prefix": _alembic_autogenerate_prefix(autogen_context),
        "tname": tname,
        "cname": cname,
    }
    if existing_type is not None:
        text += ",\n%sexisting_type=%s" % (
            indent,
            _repr_type(existing_type, autogen_context),
        )
    if server_default is not False:
        rendered = _render_server_default(server_default, autogen_context)
        text += ",\n%sserver_default=%s" % (indent, rendered)

    if type_ is not None:
        text += ",\n%stype_=%s" % (indent, _repr_type(type_, autogen_context))
    if nullable is not None:
        text += ",\n%snullable=%r" % (indent, nullable)
    if comment is not False:
        text += ",\n%scomment=%r" % (indent, comment)
    if existing_comment is not None:
        text += ",\n%sexisting_comment=%r" % (indent, existing_comment)
    if nullable is None and existing_nullable is not None:
        text += ",\n%sexisting_nullable=%r" % (indent, existing_nullable)
    if autoincrement is not None:
        text += ",\n%sautoincrement=%r" % (indent, autoincrement)
    if server_default is False and existing_server_default:
        rendered = _render_server_default(
            existing_server_default, autogen_context
        )
        text += ",\n%sexisting_server_default=%s" % (indent, rendered)
    if schema and not autogen_context._has_batch:
        text += ",\n%sschema=%r" % (indent, schema)
    text += ")"
    return text


class _f_name:
    def __init__(self, prefix: str, name: conv) -> None:
        self.prefix = prefix
        self.name = name

    def __repr__(self) -> str:
        return "%sf(%r)" % (self.prefix, _ident(self.name))


def _ident(name: Optional[Union[quoted_name, str]]) -> Optional[str]:
    """produce a __repr__() object for a string identifier that may
    use quoted_name() in SQLAlchemy 0.9 and greater.

    The issue worked around here is that quoted_name() doesn't have
    very good repr() behavior by itself when unicode is involved.

    """
    if name is None:
        return name
    elif isinstance(name, quoted_name):
        return str(name)
    elif isinstance(name, str):
        return name


def _render_potential_expr(
    value: Any,
    autogen_context: AutogenContext,
    *,
    wrap_in_text: bool = True,
    is_server_default: bool = False,
    is_index: bool = False,
) -> str:
    if isinstance(value, sql.ClauseElement):
        if wrap_in_text:
            template = "%(prefix)stext(%(sql)r)"
        else:
            template = "%(sql)r"

        return template % {
            "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
            "sql": autogen_context.migration_context.impl.render_ddl_sql_expr(
                value, is_server_default=is_server_default, is_index=is_index
            ),
        }

    else:
        return repr(value)


def _get_index_rendered_expressions(
    idx: Index, autogen_context: AutogenContext
) -> List[str]:
    return [
        (
            repr(_ident(getattr(exp, "name", None)))
            if isinstance(exp, sa_schema.Column)
            else _render_potential_expr(exp, autogen_context, is_index=True)
        )
        for exp in idx.expressions
    ]


def _uq_constraint(
    constraint: UniqueConstraint,
    autogen_context: AutogenContext,
    alter: bool,
) -> str:
    opts: List[Tuple[str, Any]] = []

    has_batch = autogen_context._has_batch

    if constraint.deferrable:
        opts.append(("deferrable", str(constraint.deferrable)))
    if constraint.initially:
        opts.append(("initially", str(constraint.initially)))
    if not has_batch and alter and constraint.table.schema:
        opts.append(("schema", _ident(constraint.table.schema)))
    if not alter and constraint.name:
        opts.append(
            ("name", _render_gen_name(autogen_context, constraint.name))
        )
    dialect_options = _render_dialect_kwargs_items(autogen_context, constraint)

    if alter:
        args = [repr(_render_gen_name(autogen_context, constraint.name))]
        if not has_batch:
            args += [repr(_ident(constraint.table.name))]
        args.append(repr([_ident(col.name) for col in constraint.columns]))
        args.extend(["%s=%r" % (k, v) for k, v in opts])
        args.extend(dialect_options)
        return "%(prefix)screate_unique_constraint(%(args)s)" % {
            "prefix": _alembic_autogenerate_prefix(autogen_context),
            "args": ", ".join(args),
        }
    else:
        args = [repr(_ident(col.name)) for col in constraint.columns]
        args.extend(["%s=%r" % (k, v) for k, v in opts])
        args.extend(dialect_options)
        return "%(prefix)sUniqueConstraint(%(args)s)" % {
            "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
            "args": ", ".join(args),
        }


def _user_autogenerate_prefix(autogen_context, target):
    prefix = autogen_context.opts["user_module_prefix"]
    if prefix is None:
        return "%s." % target.__module__
    else:
        return prefix


def _sqlalchemy_autogenerate_prefix(autogen_context: AutogenContext) -> str:
    return autogen_context.opts["sqlalchemy_module_prefix"] or ""


def _alembic_autogenerate_prefix(autogen_context: AutogenContext) -> str:
    if autogen_context._has_batch:
        return "batch_op."
    else:
        return autogen_context.opts["alembic_module_prefix"] or ""


def _user_defined_render(
    type_: str, object_: Any, autogen_context: AutogenContext
) -> Union[str, Literal[False]]:
    if "render_item" in autogen_context.opts:
        render = autogen_context.opts["render_item"]
        if render:
            rendered = render(type_, object_, autogen_context)
            if rendered is not False:
                return rendered
    return False


def _render_column(
    column: Column[Any], autogen_context: AutogenContext
) -> str:
    rendered = _user_defined_render("column", column, autogen_context)
    if rendered is not False:
        return rendered

    args: List[str] = []
    opts: List[Tuple[str, Any]] = []

    if column.server_default:
        rendered = _render_server_default(  # type:ignore[assignment]
            column.server_default, autogen_context
        )
        if rendered:
            if _should_render_server_default_positionally(
                column.server_default
            ):
                args.append(rendered)
            else:
                opts.append(("server_default", rendered))

    if (
        column.autoincrement is not None
        and column.autoincrement != sqla_compat.AUTOINCREMENT_DEFAULT
    ):
        opts.append(("autoincrement", column.autoincrement))

    if column.nullable is not None:
        opts.append(("nullable", column.nullable))

    if column.system:
        opts.append(("system", column.system))

    comment = column.comment
    if comment:
        opts.append(("comment", "%r" % comment))

    # TODO: for non-ascii colname, assign a "key"
    return "%(prefix)sColumn(%(name)r, %(type)s, %(args)s%(kwargs)s)" % {
        "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
        "name": _ident(column.name),
        "type": _repr_type(column.type, autogen_context),
        "args": ", ".join([str(arg) for arg in args]) + ", " if args else "",
        "kwargs": (
            ", ".join(
                ["%s=%s" % (kwname, val) for kwname, val in opts]
                + [
                    "%s=%s"
                    % (key, _render_potential_expr(val, autogen_context))
                    for key, val in sqla_compat._column_kwargs(column).items()
                ]
            )
        ),
    }


def _should_render_server_default_positionally(server_default: Any) -> bool:
    return sqla_compat._server_default_is_computed(
        server_default
    ) or sqla_compat._server_default_is_identity(server_default)


def _render_server_default(
    default: Optional[
        Union[FetchedValue, str, TextClause, ColumnElement[Any]]
    ],
    autogen_context: AutogenContext,
    repr_: bool = True,
) -> Optional[str]:
    rendered = _user_defined_render("server_default", default, autogen_context)
    if rendered is not False:
        return rendered

    if sqla_compat._server_default_is_computed(default):
        return _render_computed(cast("Computed", default), autogen_context)
    elif sqla_compat._server_default_is_identity(default):
        return _render_identity(cast("Identity", default), autogen_context)
    elif isinstance(default, sa_schema.DefaultClause):
        if isinstance(default.arg, str):
            default = default.arg
        else:
            return _render_potential_expr(
                default.arg, autogen_context, is_server_default=True
            )

    if isinstance(default, str) and repr_:
        default = repr(re.sub(r"^'|'$", "", default))

    return cast(str, default)


def _render_computed(
    computed: Computed, autogen_context: AutogenContext
) -> str:
    text = _render_potential_expr(
        computed.sqltext, autogen_context, wrap_in_text=False
    )

    kwargs = {}
    if computed.persisted is not None:
        kwargs["persisted"] = computed.persisted
    return "%(prefix)sComputed(%(text)s, %(kwargs)s)" % {
        "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
        "text": text,
        "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
    }


def _render_identity(
    identity: Identity, autogen_context: AutogenContext
) -> str:
    kwargs = sqla_compat._get_identity_options_dict(
        identity, dialect_kwargs=True
    )

    return "%(prefix)sIdentity(%(kwargs)s)" % {
        "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
        "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
    }


def _repr_type(
    type_: TypeEngine,
    autogen_context: AutogenContext,
    _skip_variants: bool = False,
) -> str:
    rendered = _user_defined_render("type", type_, autogen_context)
    if rendered is not False:
        return rendered

    if hasattr(autogen_context.migration_context, "impl"):
        impl_rt = autogen_context.migration_context.impl.render_type(
            type_, autogen_context
        )
    else:
        impl_rt = None

    mod = type(type_).__module__
    imports = autogen_context.imports
    if mod.startswith("sqlalchemy.dialects"):
        match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
        assert match is not None
        dname = match.group(1)
        if imports is not None:
            imports.add("from sqlalchemy.dialects import %s" % dname)
        if impl_rt:
            return impl_rt
        else:
            return "%s.%r" % (dname, type_)
    elif impl_rt:
        return impl_rt
    elif not _skip_variants and sqla_compat._type_has_variants(type_):
        return _render_Variant_type(type_, autogen_context)
    elif mod.startswith("sqlalchemy."):
        if "_render_%s_type" % type_.__visit_name__ in globals():
            fn = globals()["_render_%s_type" % type_.__visit_name__]
            return fn(type_, autogen_context)
        else:
            prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
            return "%s%r" % (prefix, type_)
    else:
        prefix = _user_autogenerate_prefix(autogen_context, type_)
        return "%s%r" % (prefix, type_)


def _render_ARRAY_type(type_: ARRAY, autogen_context: AutogenContext) -> str:
    return cast(
        str,
        _render_type_w_subtype(
            type_, autogen_context, "item_type", r"(.+?\()"
        ),
    )


def _render_Variant_type(
    type_: TypeEngine, autogen_context: AutogenContext
) -> str:
    base_type, variant_mapping = sqla_compat._get_variant_mapping(type_)
    base = _repr_type(base_type, autogen_context, _skip_variants=True)
    assert base is not None and base is not False  # type: ignore[comparison-overlap]  # noqa:E501
    for dialect in sorted(variant_mapping):
        typ = variant_mapping[dialect]
        base += ".with_variant(%s, %r)" % (
            _repr_type(typ, autogen_context, _skip_variants=True),
            dialect,
        )
    return base


def _render_type_w_subtype(
    type_: TypeEngine,
    autogen_context: AutogenContext,
    attrname: str,
    regexp: str,
    prefix: Optional[str] = None,
) -> Union[Optional[str], Literal[False]]:
    outer_repr = repr(type_)
    inner_type = getattr(type_, attrname, None)
    if inner_type is None:
        return False

    inner_repr = repr(inner_type)

    inner_repr = re.sub(r"([\(\)])", r"\\\1", inner_repr)
    sub_type = _repr_type(getattr(type_, attrname), autogen_context)
    outer_type = re.sub(regexp + inner_repr, r"\1%s" % sub_type, outer_repr)

    if prefix:
        return "%s%s" % (prefix, outer_type)

    mod = type(type_).__module__
    if mod.startswith("sqlalchemy.dialects"):
        match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
        assert match is not None
        dname = match.group(1)
        return "%s.%s" % (dname, outer_type)
    elif mod.startswith("sqlalchemy"):
        prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
        return "%s%s" % (prefix, outer_type)
    else:
        return None


_constraint_renderers = util.Dispatcher()


def _render_constraint(
    constraint: Constraint,
    autogen_context: AutogenContext,
    namespace_metadata: Optional[MetaData],
) -> Optional[str]:
    try:
        renderer = _constraint_renderers.dispatch(constraint)
    except ValueError:
        util.warn("No renderer is established for object %r" % constraint)
        return "[Unknown Python object %r]" % constraint
    else:
        return renderer(constraint, autogen_context, namespace_metadata)


@_constraint_renderers.dispatch_for(sa_schema.PrimaryKeyConstraint)
def _render_primary_key(
    constraint: PrimaryKeyConstraint,
    autogen_context: AutogenContext,
    namespace_metadata: Optional[MetaData],
) -> Optional[str]:
    rendered = _user_defined_render("primary_key", constraint, autogen_context)
    if rendered is not False:
        return rendered

    if not constraint.columns:
        return None

    opts = []
    if constraint.name:
        opts.append(
            ("name", repr(_render_gen_name(autogen_context, constraint.name)))
        )
    return "%(prefix)sPrimaryKeyConstraint(%(args)s)" % {
        "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
        "args": ", ".join(
            [repr(c.name) for c in constraint.columns]
            + ["%s=%s" % (kwname, val) for kwname, val in opts]
        ),
    }


def _fk_colspec(
    fk: ForeignKey,
    metadata_schema: Optional[str],
    namespace_metadata: MetaData,
) -> str:
    """Implement a 'safe' version of ForeignKey._get_colspec() that
    won't fail if the remote table can't be resolved.

    """
    colspec = fk._get_colspec()
    tokens = colspec.split(".")
    tname, colname = tokens[-2:]

    if metadata_schema is not None and len(tokens) == 2:
        table_fullname = "%s.%s" % (metadata_schema, tname)
    else:
        table_fullname = ".".join(tokens[0:-1])

    if (
        not fk.link_to_name
        and fk.parent is not None
        and fk.parent.table is not None
    ):
        # try to resolve the remote table in order to adjust for column.key.
        # the FK constraint needs to be rendered in terms of the column
        # name.

        if table_fullname in namespace_metadata.tables:
            col = namespace_metadata.tables[table_fullname].c.get(colname)
            if col is not None:
                colname = _ident(col.name)  # type: ignore[assignment]

    colspec = "%s.%s" % (table_fullname, colname)

    return colspec


def _populate_render_fk_opts(
    constraint: ForeignKeyConstraint, opts: List[Tuple[str, str]]
) -> None:
    if constraint.onupdate:
        opts.append(("onupdate", repr(constraint.onupdate)))
    if constraint.ondelete:
        opts.append(("ondelete", repr(constraint.ondelete)))
    if constraint.initially:
        opts.append(("initially", repr(constraint.initially)))
    if constraint.deferrable:
        opts.append(("deferrable", repr(constraint.deferrable)))
    if constraint.use_alter:
        opts.append(("use_alter", repr(constraint.use_alter)))
    if constraint.match:
        opts.append(("match", repr(constraint.match)))


@_constraint_renderers.dispatch_for(sa_schema.ForeignKeyConstraint)
def _render_foreign_key(
    constraint: ForeignKeyConstraint,
    autogen_context: AutogenContext,
    namespace_metadata: MetaData,
) -> Optional[str]:
    rendered = _user_defined_render("foreign_key", constraint, autogen_context)
    if rendered is not False:
        return rendered

    opts = []
    if constraint.name:
        opts.append(
            ("name", repr(_render_gen_name(autogen_context, constraint.name)))
        )

    _populate_render_fk_opts(constraint, opts)

    apply_metadata_schema = namespace_metadata.schema
    return (
        "%(prefix)sForeignKeyConstraint([%(cols)s], "
        "[%(refcols)s], %(args)s)"
        % {
            "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
            "cols": ", ".join(
                repr(_ident(f.parent.name)) for f in constraint.elements
            ),
            "refcols": ", ".join(
                repr(_fk_colspec(f, apply_metadata_schema, namespace_metadata))
                for f in constraint.elements
            ),
            "args": ", ".join(
                ["%s=%s" % (kwname, val) for kwname, val in opts]
            ),
        }
    )


@_constraint_renderers.dispatch_for(sa_schema.UniqueConstraint)
def _render_unique_constraint(
    constraint: UniqueConstraint,
    autogen_context: AutogenContext,
    namespace_metadata: Optional[MetaData],
) -> str:
    rendered = _user_defined_render("unique", constraint, autogen_context)
    if rendered is not False:
        return rendered

    return _uq_constraint(constraint, autogen_context, False)


@_constraint_renderers.dispatch_for(sa_schema.CheckConstraint)
def _render_check_constraint(
    constraint: CheckConstraint,
    autogen_context: AutogenContext,
    namespace_metadata: Optional[MetaData],
) -> Optional[str]:
    rendered = _user_defined_render("check", constraint, autogen_context)
    if rendered is not False:
        return rendered

    # detect the constraint being part of
    # a parent type which is probably in the Table already.
    # ideally SQLAlchemy would give us more of a first class
    # way to detect this.
    if (
        constraint._create_rule
        and hasattr(constraint._create_rule, "target")
        and isinstance(
            constraint._create_rule.target,
            sqltypes.TypeEngine,
        )
    ):
        return None
    opts = []
    if constraint.name:
        opts.append(
            ("name", repr(_render_gen_name(autogen_context, constraint.name)))
        )
    return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % {
        "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
        "opts": (
            ", " + (", ".join("%s=%s" % (k, v) for k, v in opts))
            if opts
            else ""
        ),
        "sqltext": _render_potential_expr(
            constraint.sqltext, autogen_context, wrap_in_text=False
        ),
    }


@renderers.dispatch_for(ops.ExecuteSQLOp)
def _execute_sql(autogen_context: AutogenContext, op: ops.ExecuteSQLOp) -> str:
    if not isinstance(op.sqltext, str):
        raise NotImplementedError(
            "Autogenerate rendering of SQL Expression language constructs "
            "not supported here; please use a plain SQL string"
        )
    return "op.execute(%r)" % op.sqltext


renderers = default_renderers.branch()
